../src/siri/net/promise.c \
../src/siri/net/promises.c \
../src/siri/net/protocol.c \
-../src/siri/net/socket.c
+../src/siri/net/socket.c \
+../src/siri/net/pipe.c
OBJS += \
./src/siri/net/bserver.o \
./src/siri/net/promise.o \
./src/siri/net/promises.o \
./src/siri/net/protocol.o \
-./src/siri/net/socket.o
+./src/siri/net/socket.o \
+./src/siri/net/pipe.o
C_DEPS += \
./src/siri/net/bserver.d \
./src/siri/net/promise.d \
./src/siri/net/promises.d \
./src/siri/net/protocol.d \
-./src/siri/net/socket.d
+./src/siri/net/socket.d \
+./src/siri/net/pipe.d
# Each subdirectory must supply rules for building sources it contributes
gcc -DDEBUG=1 -I../include -O0 -g3 -Wall -Wextra $(CPPFLAGS) $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<"
@echo 'Finished building: $<'
@echo ' '
-
-
../src/siri/net/promise.c \
../src/siri/net/promises.c \
../src/siri/net/protocol.c \
-../src/siri/net/socket.c
+../src/siri/net/socket.c \
+../src/siri/net/pipe.c
OBJS += \
./src/siri/net/bserver.o \
./src/siri/net/promise.o \
./src/siri/net/promises.o \
./src/siri/net/protocol.o \
-./src/siri/net/socket.o
+./src/siri/net/socket.o \
+./src/siri/net/pipe.o
C_DEPS += \
./src/siri/net/bserver.d \
./src/siri/net/promise.d \
./src/siri/net/promises.d \
./src/siri/net/protocol.d \
-./src/siri/net/socket.d
+./src/siri/net/socket.d \
+./src/siri/net/pipe.d
# Each subdirectory must supply rules for building sources it contributes
gcc -I../include -O3 -Wall -Wextra $(CPPFLAGS) $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<"
@echo 'Finished building: $<'
@echo ' '
-
-
uint8_t shard_compression;
char server_address[SIRI_CFG_MAX_LEN_ADDRESS];
char default_db_path[SIRI_PATH_MAX];
+ uint8_t pipe_support;
+ char pipe_client_name[SIRI_PATH_MAX];
} siri_cfg_t;
void siri_cfg_init(siri_t * siri);
#pragma once
#include <uv.h>
+#include <siri/net/pipe.h>
+#include <siri/net/socket.h>
#include <siri/siri.h>
typedef struct siri_s siri_t;
+#define sirinet_client_incref(client) \
+switch ((client)->type) \
+{ \
+case UV_TCP: \
+ sirinet_socket_incref(client); \
+ break; \
+case UV_NAMED_PIPE: \
+ sirinet_pipe_incref(client); \
+ break; \
+default: \
+ break; \
+}
+
+#define sirinet_client_decref(client) \
+switch ((client)->type) \
+{ \
+case UV_TCP: \
+ sirinet_socket_decref(client); \
+ break; \
+case UV_NAMED_PIPE: \
+ sirinet_pipe_decref(client); \
+ break; \
+default: \
+ uv_close((uv_handle_t *) (client), NULL); \
+ break; \
+}
+
+#define CLIENT_SIRIDB(client, siridb) \
+siridb_t * siridb = NULL; \
+switch ((client)->type) \
+{ \
+case UV_TCP: \
+ siridb = ((sirinet_socket_t *) (client)->data)->siridb; \
+ break; \
+case UV_NAMED_PIPE: \
+ siridb = ((sirinet_pipe_t *) (client)->data)->siridb; \
+ break; \
+default: \
+ break; \
+}
+
+#define CLIENT_USER(client, user) \
+siridb_user_t * user = NULL; \
+switch ((client)->type) \
+{ \
+case UV_TCP: \
+ user = (siridb_user_t *) ((sirinet_socket_t *) (client)->data)->origin; \
+ break; \
+case UV_NAMED_PIPE: \
+ user = (siridb_user_t *) ((sirinet_pipe_t *) (client)->data)->origin; \
+ break; \
+default: \
+ break; \
+}
+
int sirinet_clserver_init(siri_t * siri);
typedef ssize_t (*sirinet_clserver_getfile)(char ** buffer, siridb_t * siridb);
--- /dev/null
+#pragma once
+
+#include <uv.h>
+#include <siri/db/db.h>
+#include <siri/net/pkg.h>
+#include <xpath/xpath.h>
+
+#define PIPE_NAME_SZ SIRI_PATH_MAX
+#define RESET_BUF_SIZE 1048576 /* 1 MB */
+
+typedef enum sirinet_pipe_tp
+{
+ PIPE_CLIENT,
+ PIPE_BACKEND
+} sirinet_pipe_tp_t;
+
+typedef struct siridb_s siridb_t;
+typedef struct siridb_user_s siridb_user_t;
+
+typedef void (* on_data_cb_t)(uv_stream_t * client, sirinet_pkg_t * pkg);
+typedef void (* on_free_cb_t)(uv_stream_t * client);
+
+typedef struct sirinet_pipe_s
+{
+ sirinet_pipe_tp_t tp;
+ uint32_t ref;
+ on_data_cb_t on_data;
+ on_free_cb_t on_free;
+ siridb_t * siridb;
+ void * origin; /* can be a user, server or NULL */
+ char * buf;
+ size_t len;
+ size_t size;
+ uv_pipe_t pipe;
+} sirinet_pipe_t;
+
+uv_pipe_t * sirinet_pipe_new(
+ sirinet_pipe_tp_t tp,
+ on_data_cb_t cb_data,
+ on_free_cb_t cb_free);
+void sirinet_pipe_alloc_buffer(
+ uv_handle_t * handle,
+ size_t suggested_size,
+ uv_buf_t * buf);
+int sirinet_pipe_name(char * buffer, uv_stream_t * client);
+void sirinet_pipe_on_data(
+ uv_stream_t * client,
+ ssize_t nread,
+ const uv_buf_t * buf);
+void sirinet__pipe_free(uv_stream_t * client);
+
+#define sirinet_pipe_incref(client) \
+ ((sirinet_pipe_t *) client->data)->ref++
+
+#define sirinet_pipe_decref(client) \
+ if (!--((sirinet_pipe_t *) client->data)->ref) \
+ uv_close((uv_handle_t *) client, (uv_close_cb) sirinet__pipe_free)
max_open_files = 32768
#
-# Use shard compression for storing data points.
+# Use shard compression for storing data points.
# Set value 0 to disable shard compression.
#
enable_shard_compression = 1
+
+#
+# Enable named pipe support for client connections.
+#
+enable_pipe_support = 0
+
+#
+# SiriDB will bind the client named pipe in this location.
+#
+pipe_client_name = siridb_client.sock
.ip_support=IP_SUPPORT_ALL,
.shard_compression=0,
.server_address="localhost",
- .default_db_path="/var/lib/siridb/"
+ .default_db_path="/var/lib/siridb/",
+ .pipe_support=0,
+ .pipe_client_name="siridb_client.sock"
};
static void SIRI_CFG_read_uint(
cfgparser_t * cfgparser,
const char * option_name,
char ** dest);
+static void SIRI_CFG_read_pipe_name(
+ cfgparser_t * cfgparser,
+ const char * option_name,
+ char * dest);
static void SIRI_CFG_read_default_db_path(cfgparser_t * cfgparser);
static void SIRI_CFG_read_max_open_files(cfgparser_t * cfgparser);
static void SIRI_CFG_read_ip_support(cfgparser_t * cfgparser);
static void SIRI_CFG_read_shard_compression(cfgparser_t * cfgparser);
+static void SIRI_CFG_read_pipe_support(cfgparser_t * cfgparser);
void siri_cfg_init(siri_t * siri)
{
"bind_server_address",
&siri_cfg.bind_backend_addr);
+ SIRI_CFG_read_pipe_support(cfgparser);
+
+ if (siri_cfg.pipe_support)
+ {
+ SIRI_CFG_read_pipe_name(
+ cfgparser,
+ "pipe_client_name",
+ &siri_cfg.pipe_client_name);
+ }
+
cfgparser_free(cfgparser);
}
}
+static void SIRI_CFG_read_pipe_support(cfgparser_t * cfgparser)
+{
+ cfgparser_option_t * option;
+ cfgparser_return_t rc;
+ rc = cfgparser_get_option(
+ &option,
+ cfgparser,
+ "siridb",
+ "enable_pipe_support");
+ if (rc != CFGPARSER_SUCCESS)
+ {
+ log_debug(
+ "Missing '%s' in '%s': %s. "
+ "Disable pipe support",
+ "enable_pipe_support",
+ siri.args->config,
+ cfgparser_errmsg(rc));
+ }
+ else if (option->tp != CFGPARSER_TP_INTEGER || option->val->integer > 1)
+ {
+ log_warning(
+ "Error reading '%s' in '%s': %s. "
+ "Disable pipe support",
+ "enable_pipe_support",
+ siri.args->config,
+ "error: expecting 0 or 1");
+ }
+ else if (option->val->integer == 1)
+ {
+ siri_cfg.pipe_support = 1;
+ }
+
+}
+
static void SIRI_CFG_read_addr(
cfgparser_t * cfgparser,
const char * option_name,
}
}
+static void SIRI_CFG_read_pipe_name(
+ cfgparser_t * cfgparser,
+ const char * option_name,
+ char * dest)
+{
+ cfgparser_option_t * option;
+ cfgparser_return_t rc;
+ size_t len;
+ rc = cfgparser_get_option(
+ &option,
+ cfgparser,
+ "siridb",
+ option_name);
+ if (rc != CFGPARSER_SUCCESS)
+ {
+ log_warning(
+ "Error reading '%s' in '%s': %s. "
+ "Using default value: '%s'",
+ option_name,
+ siri.args->config,
+ cfgparser_errmsg(rc),
+ dest);
+ }
+ else if (option->tp != CFGPARSER_TP_STRING)
+ {
+ log_warning(
+ "Error reading '%s' in '%s': %s. "
+ "Using default value: '%s'",
+ option_name,
+ siri.args->config,
+ "error: expecting a string value",
+ dest);
+ }
+ else
+ {
+ *dest = 0;
+
+ /* keep space left for a terminator char */
+ strncpy(dest,
+ option->val->string,
+ SIRI_PATH_MAX - 1);
+
+ len = strlen(dest);
+
+ if (len == SIRI_PATH_MAX - 1)
+ {
+ log_warning(
+ "Default '%s' path exceeds %d characters, please "
+ "check your configuration file: %s",
+ option_name,
+ SIRI_PATH_MAX - 2,
+ siri.args->config);
+ }
+ }
+}
+
static void SIRI_CFG_read_default_db_path(cfgparser_t * cfgparser)
{
cfgparser_option_t * option;
return CPROTO_ERR_AUTH_CREDENTIALS;
}
- ((sirinet_socket_t *) client->data)->siridb = siridb;
- ((sirinet_socket_t *) client->data)->origin = user;
+ switch (client->type)
+ {
+ case UV_TCP:
+ ((sirinet_socket_t *) client->data)->siridb = siridb;
+ ((sirinet_socket_t *) client->data)->origin = user;
+ break;
+ case UV_NAMED_PIPE:
+ ((sirinet_pipe_t *) client->data)->siridb = siridb;
+ ((sirinet_pipe_t *) client->data)->origin = user;
+ break;
+ }
+
siridb_user_incref(user);
return CPROTO_RES_AUTH_SUCCESS;
return BPROTO_AUTH_ERR_UNKNOWN_UUID;
}
- ((sirinet_socket_t *) client->data)->siridb = siridb;
- ((sirinet_socket_t *) client->data)->origin = server;
+ switch (client->type)
+ {
+ case UV_TCP:
+ ((sirinet_socket_t *) client->data)->siridb = siridb;
+ ((sirinet_socket_t *) client->data)->origin = server;
+ break;
+ case UV_NAMED_PIPE:
+ ((sirinet_pipe_t *) client->data)->siridb = siridb;
+ ((sirinet_pipe_t *) client->data)->origin = server;
+ break;
+ }
free(server->version);
server->version = strdup((const char *) qp_version->via.raw);
return BPROTO_AUTH_SUCCESS;
}
-
#include <siri/err.h>
#include <siri/net/promises.h>
#include <siri/net/protocol.h>
-#include <siri/net/socket.h>
+#include <siri/net/clserver.h>
#include <siri/siri.h>
#include <stdio.h>
#include <string.h>
insert->npoints= npoints;
/* increment the client reference counter */
- sirinet_socket_incref(insert->client);
+ sirinet_client_incref(insert->client);
uv_async_init(siri.loop, handle, INSERT_points_to_pools);
handle->data = (void *) insert;
}
qp_unpacker_init(&ilocal->unpacker, promise->pkg->data, promise->pkg->len);
- sirinet_socket_incref(client);
+ sirinet_client_incref(client);
promise->data = client;
promise->cb = (sirinet_promise_cb) INSERT_local_promise_backend_cb;
sirinet_pkg_t * pkg;
sirinet_promise_t * promise;
siridb_insert_t * insert = (siridb_insert_t *) handle->data;
- siridb_t * siridb =
- ((sirinet_socket_t *) insert->client->data)->siridb;
+ CLIENT_SIRIDB(insert->client, siridb)
int n = 0;
char msg[MAX_INSERT_MSG];
{
sirinet_pkg_send(client, pkg);
}
- sirinet_socket_decref(client);
+ sirinet_client_decref(client);
sirinet_promise_decref(promise);
}
static void INSERT_points_to_pools(uv_async_t * handle)
{
siridb_insert_t * insert = (siridb_insert_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) insert->client->data)->siridb;
+ CLIENT_SIRIDB(insert->client, siridb)
+
uint16_t pool = siridb->server->pool;
sirinet_pkg_t * pkg, * repl_pkg;
sirinet_promises_t * promises = sirinet_promises_new(
siridb_insert_t * insert = (siridb_insert_t *) handle->data;
/* decrement the client reference counter */
- sirinet_socket_decref(insert->client);
+ sirinet_client_decref(insert->client);
/* free insert */
siridb_insert_free(insert);
free((uv_async_t *) handle);
}
-
-
#include <siri/db/walker.h>
#include <siri/net/clserver.h>
#include <siri/net/pkg.h>
-#include <siri/net/socket.h>
+#include <siri/net/clserver.h>
#include <siri/parser/listener.h>
#include <siri/parser/queries.h>
#include <siri/siri.h>
float factor,
int flags)
{
- siridb_t * siridb;
uv_async_t * handle = (uv_async_t *) malloc(sizeof(uv_async_t));
if (handle == NULL)
{
query->pid = pid;
/* increment client reference counter */
- sirinet_socket_incref(client);
+ sirinet_client_incref(client);
query->client = client;
query->flags = flags;
log_debug("Parsing query (%d): %s", query->flags, query->q);
}
+ CLIENT_SIRIDB(query->client, siridb)
+
/* increment active tasks */
- siridb = ((sirinet_socket_t *) query->client->data)->siridb;
siridb_tasks_inc(siridb->tasks);
/* send next call */
void siridb_query_free(uv_handle_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
/* decrement active tasks */
siridb_tasks_dec(siridb->tasks);
}
/* decrement client reference counter */
- sirinet_socket_decref(query->client);
+ sirinet_client_decref(query->client);
/* free query */
free(query);
int flags)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
/*
* the size is important here, we will use the alloc_size to guess the
#ifndef DEBUG
/* production version returns timestamp now */
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
+
qp_add_raw(query->packer, (const unsigned char *) "calc", 4);
uint64_t ts = siridb_time_now(siridb, query->start);
{
int rc;
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
+
siridb_walker_t * walker = siridb_walker_new(
siridb,
siridb_time_now(siridb, query->start),
char buffer[packer->alloc_size];
size_t size = packer->alloc_size;
+ CLIENT_SIRIDB(query->client, siridb)
+
rc = QUERY_rebuild(
- ((sirinet_socket_t *) query->client->data)->siridb,
+ siridb,
query->pr->tree->children->node,
buffer,
&size,
sirinet_pkg_send(client, package);
}
}
-
#include <siri/net/clserver.h>
#include <siri/net/promises.h>
#include <siri/net/protocol.h>
-#include <siri/net/socket.h>
#include <siri/siri.h>
#include <siri/version.h>
#include <stdbool.h>
*/
#define MAX_QUERY_PKG_SIZE 65535
-
#define DEFAULT_BACKLOG 128
-#define CHECK_SIRIDB(ssocket) \
-sirinet_socket_t * ssocket = client->data; \
-if (ssocket->siridb == NULL) \
+#define CHECK_SIRIDB(client, siridb) \
+CLIENT_SIRIDB(client, siridb) \
+if ((siridb) == NULL) \
{ \
sirinet_pkg_t * package; \
package = sirinet_pkg_new(pkg->pid, 0, CPROTO_ERR_NOT_AUTHENTICATED, NULL);\
static uv_loop_t * loop = NULL;
static struct sockaddr_storage client_addr;
-static uv_tcp_t client_server;
+static uv_tcp_t client_server_tcp;
+static uv_pipe_t client_server_pipe;
static void on_data(uv_stream_t * client, sirinet_pkg_t * pkg);
-static void on_new_connection(uv_stream_t * server, int status);
+static void on_tcp_data(uv_stream_t * client, sirinet_pkg_t * pkg);
+static void on_tcp_new_connection(uv_stream_t * server, int status);
+static void on_pipe_data(uv_stream_t * client, sirinet_pkg_t * pkg);
+static void on_pipe_free(uv_stream_t * client);
+static void on_pipe_new_connection(uv_stream_t * server, int status);
static void on_auth_request(uv_stream_t * client, sirinet_pkg_t * pkg);
static void on_query(uv_stream_t * client, sirinet_pkg_t * pkg);
static void on_insert(uv_stream_t * client, sirinet_pkg_t * pkg);
/* bind loop to the given loop */
loop = siri->loop;
- uv_tcp_init(loop, &client_server);
+ uv_tcp_init(loop, &client_server_tcp);
+ uv_pipe_init(loop, &client_server_pipe, 0);
/* make sure data is set to NULL so we later on can check this value. */
- client_server.data = NULL;
+ client_server_tcp.data = NULL;
if (siri->cfg->bind_client_addr != NULL)
{
(struct sockaddr_in *) &client_addr);
}
- uv_tcp_bind(
- &client_server,
+ rc = uv_tcp_bind(
+ &client_server_tcp,
(const struct sockaddr *) &client_addr,
(siri->cfg->ip_support == IP_SUPPORT_IPV6ONLY) ?
UV_TCP_IPV6ONLY : 0);
+ if (rc)
+ {
+ log_error("Error binding TCP client server: %s", uv_strerror(rc));
+ return 1;
+ }
+
rc = uv_listen(
- (uv_stream_t*) &client_server,
+ (uv_stream_t*) &client_server_tcp,
DEFAULT_BACKLOG,
- on_new_connection);
+ on_tcp_new_connection);
if (rc)
{
- log_error("Error listening client server: %s", uv_strerror(rc));
+ log_error("Error listening TCP client server: %s", uv_strerror(rc));
return 1;
}
- log_info("Start listening for client connections on port %d",
+ log_info("Start listening for TCP client connections on port %d",
siri->cfg->listen_client_port);
+ if (siri->cfg->pipe_support)
+ {
+ char *pipe_name = siri->cfg->pipe_client_name;
+
+ rc = uv_pipe_bind(
+ &client_server_pipe,
+ pipe_name);
+
+ if (rc)
+ {
+ log_error("Error binding pipe client server: %s", uv_strerror(rc));
+ return 1;
+ }
+
+ rc = uv_listen(
+ (uv_stream_t*) &client_server_pipe,
+ DEFAULT_BACKLOG,
+ on_pipe_new_connection);
+
+ if (rc)
+ {
+ log_error("Error listening TCP client server: %s", uv_strerror(rc));
+ return 1;
+ }
+
+ log_info("Start listening for pipe client connections on '%s'",
+ pipe_name);
+ }
+
return 0;
}
-static void on_new_connection(uv_stream_t * server, int status)
+static void on_tcp_new_connection(uv_stream_t * server, int status)
{
- log_debug("Received a client connection request.");
+ log_debug("Received a TCP client connection request.");
if (status < 0)
{
- log_error("Client connection error: %s", uv_strerror(status));
+ log_error("TCP client connection error: %s", uv_strerror(status));
return;
}
uv_tcp_t * client =
- sirinet_socket_new(SOCKET_CLIENT, (on_data_cb_t) &on_data);
+ sirinet_socket_new(SOCKET_CLIENT, (on_data_cb_t) &on_tcp_data);
if (client != NULL)
{
}
}
-static void on_data(uv_stream_t * client, sirinet_pkg_t * pkg)
+static void on_pipe_new_connection(uv_stream_t * server, int status)
{
- if (Logger.level == LOGGER_DEBUG)
+ log_debug("Received a pipe client connection request.");
+
+ if (status < 0)
{
- char addr_port[ADDR_BUF_SZ];
- if (sirinet_addr_and_port(addr_port, client) == 0)
- {
- log_debug(
- "Package received from client '%s' "
- "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %s)",
- addr_port,
- pkg->pid,
- pkg->len,
- sirinet_cproto_client_str(pkg->tp));
- }
+ log_error("Pipe client connection error: %s", uv_strerror(status));
+ return;
}
- else if (pkg->len >= WARNING_PKG_SIZE)
+ uv_pipe_t * client =
+ sirinet_pipe_new(
+ PIPE_CLIENT,
+ (on_data_cb_t) &on_pipe_data,
+ (on_free_cb_t) &on_pipe_free);
+
+ if (client != NULL)
{
- char addr_port[ADDR_BUF_SZ];
- if (sirinet_addr_and_port(addr_port, client) == 0)
+ uv_pipe_init(loop, client, 0);
+
+ if (uv_accept(server, (uv_stream_t *) client) == 0)
{
- log_warning(
- "Got a large package from '%s' (pid: %d, len: %d, tp: %s)."
- " A package size smaller than 1MB is recommended!",
- addr_port,
- pkg->pid,
- pkg->len,
- sirinet_cproto_client_str(pkg->tp));
+ uv_read_start(
+ (uv_stream_t *) client,
+ sirinet_pipe_alloc_buffer,
+ sirinet_pipe_on_data);
+ }
+ else
+ {
+ sirinet_pipe_decref(client);
}
}
+}
+static void on_data(uv_stream_t * client, sirinet_pkg_t * pkg)
+{
/* in case the online flag is not set, we cannot perform any request */
if (siri.status == SIRI_STATUS_RUNNING)
{
}
else
{
- /* data->siridb can be NULL here, make sure we can handle this state */
+ CLIENT_SIRIDB(client, siridb)
+
+ /* siridb can be NULL here, make sure we can handle this state */
CLSERVER_send_server_error(
- ((sirinet_socket_t *) client->data)->siridb,
+ siridb,
client,
pkg);
}
}
+static void on_tcp_data(uv_stream_t * client, sirinet_pkg_t * pkg)
+{
+ if (Logger.level == LOGGER_DEBUG)
+ {
+ char addr_port[ADDR_BUF_SZ];
+ if (sirinet_addr_and_port(addr_port, client) == 0)
+ {
+ log_debug(
+ "Package received from client '%s' "
+ "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %s)",
+ addr_port,
+ pkg->pid,
+ pkg->len,
+ sirinet_cproto_client_str(pkg->tp));
+ }
+ }
+ else if (pkg->len >= WARNING_PKG_SIZE)
+ {
+ char addr_port[ADDR_BUF_SZ];
+ if (sirinet_addr_and_port(addr_port, client) == 0)
+ {
+ log_warning(
+ "Got a large package from '%s' (pid: %d, len: %d, tp: %s)."
+ " A package size smaller than 1MB is recommended!",
+ addr_port,
+ pkg->pid,
+ pkg->len,
+ sirinet_cproto_client_str(pkg->tp));
+ }
+ }
+
+ on_data(client, pkg);
+}
+
+static void on_pipe_data(uv_stream_t * client, sirinet_pkg_t * pkg)
+{
+ if (Logger.level == LOGGER_DEBUG)
+ {
+ char pipe_name[PIPE_NAME_SZ];
+ if (sirinet_pipe_name(pipe_name, client) == 0)
+ {
+ log_debug(
+ "Package received from client '%s' "
+ "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %s)",
+ pipe_name,
+ pkg->pid,
+ pkg->len,
+ sirinet_cproto_client_str(pkg->tp));
+ }
+ }
+ else if (pkg->len >= WARNING_PKG_SIZE)
+ {
+ char pipe_name[PIPE_NAME_SZ];
+ if (sirinet_pipe_name(pipe_name, client) == 0)
+ {
+ log_warning(
+ "Got a large package from '%s' (pid: %d, len: %d, tp: %s)."
+ " A package size smaller than 1MB is recommended!",
+ pipe_name,
+ pkg->pid,
+ pkg->len,
+ sirinet_cproto_client_str(pkg->tp));
+ }
+ }
+
+ on_data(client, pkg);
+}
+
+static void on_pipe_free(uv_stream_t * client)
+{
+ char pipe_name[PIPE_NAME_SZ];
+ if (sirinet_pipe_name(pipe_name, client) == 0)
+ {
+ uv_fs_t req;
+ uv_fs_unlink(loop, &req, pipe_name, NULL);
+ }
+}
+
static void on_auth_request(uv_stream_t * client, sirinet_pkg_t * pkg)
{
cproto_server_t rc;
static void on_query(uv_stream_t * client, sirinet_pkg_t * pkg)
{
- CHECK_SIRIDB(ssocket)
+ CHECK_SIRIDB(client, siridb)
if (pkg->len > MAX_QUERY_PKG_SIZE)
{
if (qp_time_precision.tp == QP_INT64 &&
(tp = (siridb_timep_t) qp_time_precision.via.int64) !=
- ssocket->siridb->time->precision)
+ siridb->time->precision)
{
tp %= SIRIDB_TIME_END;
}
factor = (tp == SIRIDB_TIME_DEFAULT) ? 0.0 :
- pow(1000.0, tp - ssocket->siridb->time->precision);
+ pow(1000.0, tp - siridb->time->precision);
siridb_query_run(
pkg->pid,
static void on_insert(uv_stream_t * client, sirinet_pkg_t * pkg)
{
- CHECK_SIRIDB(ssocket)
+ CHECK_SIRIDB(client, siridb)
+ CLIENT_USER(client, siridb_user)
char err_msg[SIRIDB_MAX_SIZE_ERR_MSG];
if (!siridb_user_check_access(
- (siridb_user_t *) ssocket->origin,
+ siridb_user,
SIRIDB_ACCESS_INSERT,
err_msg))
{
return;
}
- siridb_t * siridb = ssocket->siridb;
-
/* only when when the flag is EXACTLY running or
* running + re-indexing we can continue */
if ( siridb->server->flags != SERVER_FLAG_RUNNING &&
sirinet_pkg_t * pkg,
sirinet_clserver_getfile getfile)
{
- CHECK_SIRIDB(ssocket)
+ CHECK_SIRIDB(client, siridb)
+ CLIENT_USER(client, siridb_user)
- siridb_t * siridb = ssocket->siridb;
sirinet_pkg_t * package = NULL;
char err_msg[SIRIDB_MAX_SIZE_ERR_MSG];
err_msg);
}
else if (!siridb_user_check_access(
- (siridb_user_t *) ssocket->origin,
+ siridb_user,
SIRIDB_ACCESS_PROFILE_FULL,
err_msg))
{
*/
static void on_register_server(uv_stream_t * client, sirinet_pkg_t * pkg)
{
- CHECK_SIRIDB(ssocket)
+ CHECK_SIRIDB(client, siridb)
+ CLIENT_USER(client, siridb_user)
- siridb_t * siridb = ssocket->siridb;
sirinet_pkg_t * package = NULL;
siridb_server_t * new_server = NULL;
char err_msg[SIRIDB_MAX_SIZE_ERR_MSG];
err_msg);
}
else if (!siridb_user_check_access(
- (siridb_user_t *) ssocket->origin,
+ siridb_user,
SIRIDB_ACCESS_PROFILE_FULL,
err_msg))
{
if (servers != NULL && (package = sirinet_pkg_dup(pkg)) != NULL)
{
/* make sure to decrement the client in the callback */
- sirinet_socket_incref(client);
+ sirinet_client_incref(client);
siridb_servers_send_pkg(
servers,
}
/* decref the client */
- sirinet_socket_decref(server_reg->client);
+ sirinet_client_decref(server_reg->client);
/* free server register object */
free(server_reg);
}
-
--- /dev/null
+#include <assert.h>
+#include <logger/logger.h>
+#include <siri/admin/client.h>
+#include <siri/err.h>
+#include <siri/net/protocol.h>
+#include <siri/net/pipe.h>
+#include <siri/siri.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define MAX_ALLOWED_PKG_SIZE 20971520 /* 20 MB */
+
+#define QUIT_PIPE \
+ free(spipe->buf); \
+ spipe->buf = NULL; \
+ spipe->len = 0; \
+ spipe->size = 0; \
+ spipe->on_data = NULL; \
+ sirinet_pipe_decref(client); \
+ return;
+
+/*
+ * This function can raise a SIGNAL.
+ */
+void sirinet_pipe_alloc_buffer(
+ uv_handle_t * handle,
+ size_t suggested_size,
+ uv_buf_t * buf)
+{
+ sirinet_pipe_t * spipe = (sirinet_pipe_t *) handle->data;
+
+ if (!spipe->len && spipe->size > RESET_BUF_SIZE)
+ {
+ free(spipe->buf);
+ spipe->buf = (char *) malloc(suggested_size);
+ if (spipe->buf == NULL)
+ {
+ ERR_ALLOC
+ buf->len = 0;
+ return;
+ }
+ spipe->size = suggested_size;
+ spipe->len = 0;
+ }
+ buf->base = spipe->buf + spipe->len;
+ buf->len = spipe->size - spipe->len;
+}
+
+/*
+ * Buffer should have a size of PIPE_NAME_SZ
+ *
+ * Return 0 if successful or -1 in case of an error.
+ */
+int sirinet_pipe_name(char * buffer, uv_stream_t * client)
+{
+ size_t len = PIPE_NAME_SZ - 1;
+
+ if (uv_pipe_getsockname(
+ (uv_pipe_t *) client,
+ buffer,
+ &len))
+ {
+ return -1;
+ }
+
+ buffer[len] = 0;
+ return 0;
+}
+
+/*
+ * This function can raise a SIGNAL.
+ */
+void sirinet_pipe_on_data(
+ uv_stream_t * client,
+ ssize_t nread,
+ const uv_buf_t * buf)
+{
+ sirinet_pipe_t * spipe = (sirinet_pipe_t *) client->data;
+ sirinet_pkg_t * pkg;
+ size_t total_sz;
+ uint8_t check;
+
+ /*
+ * spipe->on_data is NULL when 'sirinet_pipe_decref' is called from
+ * within this function. We should never call 'sirinet_pipe_decref' twice
+ * so the best thing is to log and and exit this function.
+ */
+ if (spipe->on_data == NULL)
+ {
+ char pipe_name[PIPE_NAME_SZ];
+ if (sirinet_pipe_name(pipe_name, client) == 0)
+ {
+ log_error(
+ "Received data from '%s' but we ignore the data since the "
+ "connection will be closed in a few seconds...",
+ pipe_name);
+ }
+ return;
+ }
+
+ if (nread < 0)
+ {
+ if (nread != UV_EOF)
+ {
+ log_error("Read error: %s", uv_err_name(nread));
+ }
+ QUIT_PIPE
+ }
+
+ spipe->len += nread;
+
+ if (spipe->len < sizeof(sirinet_pkg_t))
+ {
+ return;
+ }
+
+ pkg = (sirinet_pkg_t *) spipe->buf;
+ check = pkg->tp ^ 255;
+ if ( check != pkg->checkbit ||
+ (spipe->tp == PIPE_CLIENT && pkg->len > MAX_ALLOWED_PKG_SIZE))
+ {
+ char pipe_name[PIPE_NAME_SZ];
+ if (sirinet_pipe_name(pipe_name, client) == 0)
+ {
+ log_error(
+ "Got an illegal package or size too large from '%s', "
+ "closing connection "
+ "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %" PRIu8 ")",
+ pipe_name, pkg->pid, pkg->len, pkg->tp);
+ }
+ QUIT_PIPE
+ }
+
+ total_sz = sizeof(sirinet_pkg_t) + pkg->len;
+ if (spipe->len < total_sz)
+ {
+ if (spipe->size < total_sz)
+ {
+ char * tmp = realloc(spipe->buf, total_sz);
+ if (tmp == NULL)
+ {
+ log_critical(
+ "Cannot allocate size for package "
+ "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %" PRIu8 ")",
+ pkg->pid, pkg->len, pkg->tp);
+ QUIT_PIPE
+ }
+ spipe->buf = tmp;
+ spipe->size = total_sz;
+ }
+ return;
+ }
+
+ /* call on-data function */
+ (*spipe->on_data)(client, pkg);
+
+ spipe->len -= total_sz;
+
+ if (spipe->len > 0)
+ {
+ /* move data and call sirinet_pipe_on_data() function again */
+ memmove(spipe->buf, spipe->buf + total_sz, spipe->len);
+ sirinet_pipe_on_data(client, 0, buf);
+ }
+}
+
+/*
+ * Returns NULL and raises a SIGNAL in case an error has occurred.
+ *
+ * Note: ((sirinet_pipe_t *) pipe->data)->ref is initially set to 1
+ */
+uv_pipe_t * sirinet_pipe_new(
+ sirinet_pipe_tp_t tp,
+ on_data_cb_t cb_data,
+ on_free_cb_t cb_free)
+{
+ sirinet_pipe_t * spipe =
+ (sirinet_pipe_t *) malloc(sizeof(sirinet_pipe_t));
+
+ if (spipe == NULL)
+ {
+ ERR_ALLOC
+ return NULL;
+ }
+
+ spipe->tp = tp;
+ spipe->on_data = cb_data;
+ spipe->on_free = cb_free;
+ spipe->buf = NULL;
+ spipe->len = 0;
+ spipe->size = -1; /* this will force allocating on first request */
+ spipe->origin = NULL;
+ spipe->siridb = NULL;
+ spipe->ref = 1;
+ spipe->pipe.data = spipe;
+
+ return &spipe->pipe;
+}
+
+/*
+ * Never use this function but call sirinet_pipe_decref.
+ * Destroy pipe. (parsing NULL is not allowed)
+ *
+ * We know three different pipe types:
+ * - client: used for clients. a user object might be destroyed.
+ * - back-end: used to connect to other servers. a server might be destroyed.
+ * - server: user for severs connecting to here. a server might be destroyed.
+ *
+ * In case a server is destroyed, remaining promises will be cancelled and
+ * the call-back functions will be called.
+ */
+void sirinet__pipe_free(uv_stream_t * client)
+{
+ sirinet_pipe_t * spipe = client->data;
+
+#if DEBUG
+ log_debug("Free pipe type: %d", spipe->tp);
+#endif
+
+ switch (spipe->tp)
+ {
+ case PIPE_CLIENT: /* listens to client connections */
+ if (spipe->origin != NULL)
+ {
+ siridb_user_t * user = (siridb_user_t *) spipe->origin;
+ siridb_user_decref(user);
+ }
+ break;
+ case PIPE_BACKEND: /* listens to server connections */
+ if (spipe->origin != NULL)
+ {
+ siridb_server_t * server = (siridb_server_t *) spipe->origin;
+ siridb_server_decref(server);
+ }
+ break;
+ }
+
+ if (spipe->on_free != NULL)
+ {
+ spipe->on_free(client);
+ }
+
+ free(spipe->buf);
+ free(spipe);
+}
#include <logger/logger.h>
#include <siri/err.h>
#include <siri/net/pkg.h>
-#include <siri/net/socket.h>
+#include <siri/net/clserver.h>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
}
/* increment client reference counter */
- sirinet_socket_incref(client);
+ sirinet_client_incref(client);
data->client = client;
data->pkg = pkg;
pkg_send_t * data = (pkg_send_t *) req->data;
- sirinet_socket_decref(data->client);
+ sirinet_client_decref(data->client);
free(data->pkg);
free(data);
#include <siri/help/help.h>
#include <siri/net/promises.h>
#include <siri/net/protocol.h>
-#include <siri/net/socket.h>
+#include <siri/net/clserver.h>
#include <siri/parser/listener.h>
#include <siri/parser/queries.h>
#include <siri/siri.h>
/*
* Start SIRIPARSER_MASTER_CHECK_ACCESS
*/
-#define SIRIPARSER_MASTER_CHECK_ACCESS(ACCESS_BIT) \
+#define SIRIPARSER_MASTER_CHECK_ACCESS(user, ACCESS_BIT) \
if (IS_MASTER && \
!siridb_user_check_access( \
- (siridb_user_t *) ((sirinet_socket_t *) query->client->data)->origin, \
+ user, \
ACCESS_BIT, \
query->err_msg)) \
{ \
static void enter_alter_group(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_alter_t * q_alter = (query_alter_t *) query->data;
MASTER_CHECK_ACCESSIBLE(siridb)
static void enter_alter_server(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_alter_t * q_alter = (query_alter_t *) query->data;
siridb_server_t * server = siridb_server_from_node(
siridb,
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_ALTER)
+ CLIENT_USER(query->client, siridb_user)
+ SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_ALTER)
#if DEBUG
assert (query->packer == NULL);
static void enter_alter_user(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
MASTER_CHECK_ACCESSIBLE(siridb)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_COUNT)
+ CLIENT_USER(query->client, siridb_user)
+ SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_COUNT)
#if DEBUG
assert (query->packer == NULL);
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_CREATE)
+ CLIENT_USER(query->client, siridb_user)
+ SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_CREATE)
SIRIPARSER_NEXT_NODE
}
assert (query->packer == NULL);
#endif
- SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_DROP)
+ CLIENT_USER(query->client, siridb_user)
+ SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_DROP)
query->packer = sirinet_packer_new(1024);
static void enter_grant_user(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
-
- SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_GRANT)
+ CLIENT_SIRIDB(query->client, siridb)
+ CLIENT_USER(query->client, siridb_user)
+ SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_GRANT)
MASTER_CHECK_ACCESSIBLE(siridb)
cleri_node_t * user_node =
static void enter_group_match(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
cleri_node_t * node = query->nodes->node;
query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data;
static void enter_limit_expr(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_list_t * qlist = (query_list_t *) query->data;
int64_t limit = query->nodes->node->children->next->node->result;
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_LIST)
+ CLIENT_USER(query->client, siridb_user)
+ SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_LIST)
#if DEBUG
assert (query->packer == NULL);
static void enter_revoke_user(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
-
- SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_REVOKE)
+ CLIENT_SIRIDB(query->client, siridb)
+ CLIENT_USER(query->client, siridb_user)
+ SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_REVOKE)
MASTER_CHECK_ACCESSIBLE(siridb)
cleri_node_t * user_node =
static void enter_select_stmt(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_select_t * q_select;
cleri_children_t * child;
int skip_get_points;
- SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_SELECT)
+ CLIENT_USER(query->client, siridb_user)
+ SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_SELECT)
MASTER_CHECK_ACCESSIBLE(siridb)
#if DEBUG
static void enter_set_expression(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
cleri_node_t * node = query->nodes->node->children->next->next->node;
query_alter_t * q_alter = (query_alter_t *) query->data;
static void enter_set_name(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
cleri_node_t * name_node =
query->nodes->node->children->next->next->node;
{
siridb_query_t * query = (siridb_query_t *) handle->data;
cleri_node_t * node = query->nodes->node;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data;
siridb_series_t * series = NULL;
uint16_t pool;
static void enter_series_all(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
siridb_series_t * series;
query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data;
static void enter_series_re(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
cleri_node_t * node = query->nodes->node;
query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data;
static void exit_alter_group(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
+ CLIENT_SIRIDB(query->client, siridb)
- if (siridb_groups_save(
- ((sirinet_socket_t *) query->client->data)->siridb->groups))
+ if (siridb_groups_save(siridb->groups))
{
FILE_ERR_RET
}
static void exit_alter_user(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
+ CLIENT_SIRIDB(query->client, siridb)
- if (siridb_users_save(((sirinet_socket_t *) query->client->data)->siridb))
+ if (siridb_users_save(siridb))
{
FILE_ERR_RET
}
static void exit_count_groups(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_count_t * q_count = (query_count_t *) query->data;
if (q_count->where_expr == NULL || !cexpr_contains(
static void exit_count_pools(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_count_t * q_count = (query_count_t *) query->data;
siridb_pool_t * pool = siridb->pools->pool + siridb->server->pool;
static void exit_count_series(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_count_t * q_count = (query_count_t *) query->data;
MASTER_CHECK_ONLINE(siridb)
static void exit_count_series_length(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_count_t * q_count = (query_count_t *) query->data;
MASTER_CHECK_ACCESSIBLE(siridb)
static void exit_count_servers(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_count_t * q_count = (query_count_t *) query->data;
cexpr_t * where_expr = q_count->where_expr;
cexpr_cb_t cb = (cexpr_cb_t) siridb_server_cexpr_cb;
static void exit_count_servers_received(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_count_t * q_count = (query_count_t *) query->data;
cexpr_t * where_expr = q_count->where_expr;
cexpr_cb_t cb = (cexpr_cb_t) siridb_server_cexpr_cb;
static void exit_count_servers_selected(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_count_t * q_count = (query_count_t *) query->data;
cexpr_t * where_expr = q_count->where_expr;
cexpr_cb_t cb = (cexpr_cb_t) siridb_server_cexpr_cb;
static void exit_count_shards(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_count_t * q_count = (query_count_t *) query->data;
qp_add_raw(query->packer, (const unsigned char *) "shards", 6);
static void exit_count_shards_size(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_count_t * q_count = (query_count_t *) query->data;
uint64_t duration;
size_t i;
static void exit_count_users(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
llist_node_t * node = siridb->users->first;
cexpr_t * where_expr = ((query_count_t *) query->data)->where_expr;
cexpr_cb_t cb = (cexpr_cb_t) siridb_user_cexpr_cb;
static void exit_create_group(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
cleri_node_t * name_nd =
query->nodes->node->children->next->node;
static void exit_create_user(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
siridb_user_t * user = ((query_alter_t *) query->data)->via.user;
cleri_node_t * user_node =
query->nodes->node->children->next->node;
static void exit_drop_group(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
MASTER_CHECK_ACCESSIBLE(siridb)
static void exit_drop_series(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_drop_t * q_drop = (query_drop_t *) query->data;
MASTER_CHECK_ACCESSIBLE(siridb)
static void exit_drop_server(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
siridb_server_t * server = siridb_server_from_node(
siridb,
query->nodes->node->children->next->node->children->node,
static void exit_drop_shards(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_drop_t * q_drop = (query_drop_t *) query->data;
MASTER_CHECK_ACCESSIBLE(siridb)
static void exit_drop_user(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
MASTER_CHECK_ACCESSIBLE(siridb)
strx_extract_string(username, user_node->str, user_node->len);
if (siridb_users_drop_user(
- ((sirinet_socket_t *) query->client->data)->siridb,
+ siridb,
username,
query->err_msg))
{
static void exit_grant_user(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
+ CLIENT_SIRIDB(query->client, siridb)
- if (siridb_users_save(((sirinet_socket_t *) query->client->data)->siridb))
+ if (siridb_users_save(siridb))
{
sprintf(query->err_msg, "Could not write users to file!");
log_critical(query->err_msg);
static void exit_list_groups(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_list_t * q_list = (query_list_t *) query->data;
int is_local = (q_list->props == NULL);
static void exit_list_pools(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
query_list_t * q_list = (query_list_t *) query->data;
siridb_pool_t * pool = siridb->pools->pool + siridb->server->pool;
siridb_pool_walker_t wpool = {
{
siridb_query_t * query = (siridb_query_t *) handle->data;
query_list_t * q_list = (query_list_t *) query->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
if (q_list->props == NULL)
{
static void exit_list_servers(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
+
query_list_t * q_list = (query_list_t *) query->data;
cexpr_t * where_expr = q_list->where_expr;
int is_local = IS_MASTER;
static void exit_list_shards(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
+
query_list_t * q_list = (query_list_t *) query->data;
uint_fast16_t prop;
uint64_t duration;
static void exit_list_users(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- llist_node_t * node =
- ((sirinet_socket_t *) query->client->data)->siridb->users->first;
+ CLIENT_SIRIDB(query->client, siridb)
+
+ llist_node_t * node = siridb->users->first;
slist_t * props = ((query_list_t *) query->data)->props;
cexpr_cb_t cb = (cexpr_cb_t) siridb_user_cexpr_cb;
query_list_t * q_list = (query_list_t *) query->data;
static void exit_revoke_user(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
+ CLIENT_SIRIDB(query->client, siridb)
- if (siridb_users_save(((sirinet_socket_t *) query->client->data)->siridb))
+ if (siridb_users_save(siridb))
{
sprintf(query->err_msg, "Could not write users to file!");
log_critical(query->err_msg);
static void exit_set_address(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
siridb_server_t * server = ((query_alter_t *) query->data)->via.server;
cleri_node_t * node = query->nodes->node->children->next->next->node;
+ CLIENT_SIRIDB(query->client, siridb)
if (siridb->server == server || server->socket != NULL)
{
static void exit_set_backup_mode(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
#if DEBUG
assert (query->data != NULL);
static void exit_set_drop_threshold(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
MASTER_CHECK_ACCESSIBLE(siridb)
static void exit_set_list_limit(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
MASTER_CHECK_ACCESSIBLE(siridb)
MASTER_CHECK_VERSION(siridb, "2.0.17")
static void exit_set_log_level(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
query_alter_t * q_alter = (query_alter_t *) query->data;
+ CLIENT_SIRIDB(query->client, siridb)
#if DEBUG
assert (query->data != NULL);
static void exit_set_port(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
siridb_server_t * server = ((query_alter_t *) query->data)->via.server;
cleri_node_t * node = query->nodes->node->children->next->next->node;
+ CLIENT_SIRIDB(query->client, siridb)
if (siridb->server == server || server->socket != NULL)
{
static void exit_set_select_points_limit(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
MASTER_CHECK_ACCESSIBLE(siridb)
MASTER_CHECK_VERSION(siridb, "2.0.17")
static void exit_set_timezone(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
cleri_node_t * node = query->nodes->node->children->next->next->node;
+ CLIENT_SIRIDB(query->client, siridb)
MASTER_CHECK_ACCESSIBLE(siridb)
static void exit_show_stmt(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
-
- SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_SHOW)
+ CLIENT_SIRIDB(query->client, siridb)
+ CLIENT_USER(query->client, siridb_user)
+ SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_SHOW)
cleri_children_t * children =
query->nodes->node->children->next->node->children;
qp_add_raw(query->packer, (const unsigned char *) "data", 4);
qp_add_type(query->packer, QP_ARRAY_OPEN);
- siridb_user_t * user = ((sirinet_socket_t *) query->client->data)->origin;
+ CLIENT_USER(query->client, user)
who_am_i = user->name;
if (children->node == NULL)
{
continue;
}
- prop_cb(((sirinet_socket_t *) query->client->data)->siridb,
- query->packer, 1);
+ prop_cb(siridb, query->packer, 1);
}
}
else
#if DEBUG
assert (prop_cb != NULL); /* all props are implemented */
#endif
- prop_cb(((sirinet_socket_t *) query->client->data)->siridb,
- query->packer, 1);
+ prop_cb(siridb, query->packer, 1);
if (children->next == NULL)
{
static void exit_timeit_stmt(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
+ CLIENT_SIRIDB(query->client, siridb)
+
struct timespec end;
- char * name =
- ((sirinet_socket_t *) query->client->data)->siridb->server->name;
+ char * name = siridb->server->name;
clock_gettime(CLOCK_REALTIME, &end);
qp_add_type(query->timeit, QP_MAP2);
static void async_drop_series(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
query_drop_t * q_drop = (query_drop_t *) query->data;
+ CLIENT_SIRIDB(query->client, siridb)
siridb_series_t * series;
uint8_t async_more = 0;
{
siridb_query_t * query = (siridb_query_t *) handle->data;
query_drop_t * q_drop = (query_drop_t *) query->data;
+ CLIENT_SIRIDB(query->client, siridb)
if (q_drop->shards_list->len)
{
siridb_shard_drop(
shard,
- ((sirinet_socket_t *) query->client->data)->siridb);
+ siridb);
siridb_shard_decref(shard);
}
{
siridb_query_t * query = (siridb_query_t *) handle->data;
query_select_t * q_select = (query_select_t *) query->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
uint8_t async_more = 0;
siridb_series_t * series;
siridb_points_t * points;
{
siridb_query_t * query = (siridb_query_t *) handle->data;
query_select_t * q_select = (query_select_t *) query->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
uint8_t async_more = 0;
siridb_series_t * series;
siridb_points_t * points;
sirinet_promise_t * promise;
qp_unpacker_t unpacker;
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
siridb_group_t * group;
qp_obj_t qp_name;
qp_obj_t qp_series;
sirinet_promise_t * promise;
qp_unpacker_t unpacker;
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
size_t err_count = 0;
query_select_t * q_select = (query_select_t *) query->data;
qp_obj_t qp_name;
uv_async_t * handle = (uv_async_t *) work->data;
siridb_query_t * query = (siridb_query_t *) handle->data;
query_select_t * q_select = (query_select_t *) query->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+ CLIENT_SIRIDB(query->client, siridb)
siridb->selected_points += q_select->n;
int rc = ct_items(
q_select->result,
static void finish_list_groups(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
query_list_t * q_list = (query_list_t *) query->data;
+ CLIENT_SIRIDB(query->client, siridb)
if (q_list->props == NULL)
{
static void finish_count_groups(uv_async_t * handle)
{
siridb_query_t * query = (siridb_query_t *) handle->data;
- siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
query_count_t * q_count = (query_count_t *) query->data;
+ CLIENT_SIRIDB(query->client, siridb)
/* Note: ct_values(..values_count_groups..) can only result in a positive
* value.
#include <siri/help/help.h>
#include <siri/net/bserver.h>
#include <siri/net/clserver.h>
+#include <siri/net/pipe.h>
#include <siri/net/socket.h>
#include <siri/parser/listener.h>
#include <siri/siri.h>
switch (handle->type)
{
- case UV_WORK:
- break;
case UV_SIGNAL:
/* this is where we cleanup the signal handlers */
uv_close(handle, NULL);
}
break;
+ case UV_NAMED_PIPE:
+ /* This can be a pipe server with data set to NULL or a SiriDB pipe
+ * which should be destroyed.
+ */
+ if (handle->data == NULL)
+ {
+ uv_close(handle, NULL);
+ }
+ else
+ {
+ sirinet_pipe_decref(handle);
+ }
+ break;
+
case UV_TIMER:
/* we do not expect any timer object since they should all be closed
* (or at least closing) at this point.